Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-16909 Refactor GroupCoordinatorConfig with AbstractConfig #16458

Merged
merged 10 commits into from
Jul 1, 2024

Conversation

brandboat
Copy link
Member

related to https://issues.apache.org/jira/browse/KAFKA-16909

as title, this pr follows RemoteLogManagerConfig.java, pass AbstractConfig to constructor.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat thanks for this patch

config.consumerGroupMigrationPolicy,
config.offsetsTopicCompressionType
)
val groupCoordinatorConfig = new GroupCoordinatorConfig(config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a method to KafkaConfig to return GroupCoordinatorConfig? With that change, we can remove all GroupCoordinatorConfig-related getters from KafkaConfig

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion! I have added the method to KafkaConfig to return GroupCoordinatorConfig and removed all related getters. This change is included in the latest commits.

@brandboat brandboat marked this pull request as draft June 29, 2024 13:48
@brandboat brandboat marked this pull request as ready for review June 29, 2024 14:05
Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat thanks for this patch

@@ -2576,15 +2577,15 @@ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() {

OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder()
.withGroupMetadataManager(groupMetadataManager)
.withOffsetsRetentionMs(1000)
.withOffsetsRetentionMinutes(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why we need this change. Also, the value is changed from 1 second to 1 minute?

Copy link
Member Author

@brandboat brandboat Jun 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the original time unit of config offsets.retention.minutes is in minutes, and before this refactor we pass offsets.retention in millis to GroupCoordinatorConfig constructor and use it in OffsetMetadataManagerTest, now we pass AbstractConfig (i.e. KafkaConfig) to GroupCoordinatorConfig, which means we should pass offsets.retention in minutes instead of ms

configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, ConsumerGroupMigrationPolicy.DISABLED.name());
configs.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, (int) CompressionType.NONE.id);

return new GroupCoordinatorConfig(new GroupCoordinatorTestConfig(configs));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        return new GroupCoordinatorConfig(new AbstractConfig(Utils.mergeConfigs(Arrays.asList(
            GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF,
            GroupCoordinatorConfig.NEW_GROUP_CONFIG_DEF,
            GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF,
            GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF)),
            configs, false));

WDYT? we don't need the temporary class

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do, thanks for the suggestion.

val groupCoordinatorAppendLingerMs = getInt(GroupCoordinatorConfig.GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG)

/** Consumer group configs */
val consumerGroupSessionTimeoutMs = getInt(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@OmniaGM Could you please take a look? Does it follow your idea that we should move getters out of KafkaConfig?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late response here, this looks great! I think the validators could also be moved out as well

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we will file a MINOR for it

Copy link
Contributor

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@chia7712 chia7712 merged commit 206d0f8 into apache:trunk Jul 1, 2024
1 check failed
@brandboat brandboat deleted the KAFKA-16909 branch July 1, 2024 15:32
Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandboat @chia7712 Thanks for the PR. Overall, I agree with the change. However, I have left a few suggestions for consideration. Please let me know what you think.

Comment on lines +351 to +354
public ConsumerGroupMigrationPolicy consumerGroupMigrationPolicy() {
return ConsumerGroupMigrationPolicy.parse(
config.getString(GroupCoordinatorConfig.CONSUMER_GROUP_MIGRATION_POLICY_CONFIG));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to point out that this change is a bit risky in my opinion because it does not ensure during the startup that the migration policy is really correct. If it fails somehow, it will fail later on when consumerGroupMigrationPolicy is accessed for the first time. I wonder if we should keep local attributes and initialize them in the constructor.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation is addressed by the GroupCoordinatorConfig's config definition.

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala#L182

Hence, the string value is valid in constructing GroupCoordinatorConfig

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. I was thinking about the case where the validation has a bug or is not good enough.

Comment on lines +344 to +346
public int offsetCommitTimeoutMs() {
return config.getInt(GroupCoordinatorConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an example of the config which is accessed extremely frequently (there are other like this too). I think that having attributes would be better as it avoid having to look it up in the config every time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question!

The approach of this PR is to make sure GroupCoordinatorConfig can see the latest configs to avoid potential bugs (#16394). However, it has side effect which brings extra cost: "volatile" and "lookup/parse".

  • The cost of "volatile" (or other similar sync trick) is required if we make GroupCoordinatorConfig see latest configs.
  • The cost of "lookup/parse" could be eliminated if we do a bit refactor for it. For example, we pass Supplier<GroupCoordinatorConfig> instead of GroupCoordinatorConfig to the callers. By that changes, we can make GroupCoordinatorConfig have all immutable pre-created local attributes. The impl of Supplier<GroupCoordinatorConfig> will be generated by KafkaConfig and it looks like AtomicReference::get. However, the side effect is that the usage will get a little ugly: "config.numThreads()" -> "config.get().numThreads()"

WDYT? BTW, we had a related discussion in https://issues.apache.org/jira/browse/KAFKA-17001

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the configs in this class are dynamic so I am not sure that it is worth it. We could perhaps mention it in the javadoc of the class. If we introduce a dynamic config, we should indeed not use an attribute for it.

For the context, in KafkaConfig, we always had the distinction between val (static values) and def (dynamic ones). We could do the same here, I suppose.

@chia7712
Copy link
Contributor

chia7712 commented Jul 3, 2024

I double-check all configs and yes all of them are not dynamic. Maybe we don't need to be over-engineering for now. Hence, we can have a PR for following changes.

  1. re-introduce attributes to GroupCoordinatorConfig @dajac comment
  2. move GroupCoordinatorConfig validation from KafkaConfig to GroupCoordinatorConfig ( @OmniaGM comment)
  3. do validation in construction of GroupCoordinatorConfig. I guess this is a bit different to @OmniaGM idea (KAFKA-15853: Refactor ShareGroupConfig with AbstractConfig #16506 add a method validate to ShareGroupConfig instead of validating configs in construction). I prefer to validate all configs in construction as KafakConfig do validation in construction too.
  4. add docs to GroupCoordinatorConfig to explain why we add attributes

@brandboat @dajac @OmniaGM PTAL, I hope this can be a guideline for all similar config class

@chia7712
Copy link
Contributor

chia7712 commented Jul 4, 2024

@brandboat I open https://issues.apache.org/jira/browse/KAFKA-17081 as follow-up. And it is assigned to you. PLEASE feel free to assign it back to me if you have no bandwidth

@dajac
Copy link
Contributor

dajac commented Jul 4, 2024

@chia7712 Thanks. Sounds good to me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants